package com.itcast.hadoop.ii;/**
 * Created by Administrator on 2019/4/8 0008.
 */

import com.itcast.hadoop.flowsort.SortMR;
import com.itcast.hadoop.flowsum.FlowBean;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author ydf
 * @com kt
 * @create 2019-04-08 上午 11:33
 **/
public class InverseIndexStepOne {

    public static class StepOneMapper extends Mapper<LongWritable,Text,Text,LongWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line=value.toString();
            String[] fields= StringUtils.split(line," ");
            //获取这一行数据所在的文件切片
            FileSplit inputSplit= (FileSplit) context.getInputSplit();
            //从文件切片中获取文件名
            String fileName=inputSplit.getPath().getName();
            for(String field:fields){
                //封装kv输出，K：hello --> a.txt  V: 1
                context.write(new Text(field+"-->"+fileName),new LongWritable(1));
            }
        }
    }

    public static class StepOneReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        // <hello-->a.txt,{1,1,1,……}>
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

            long counter=0;
            for(LongWritable value:values){
                counter+=value.get();
            }
            context.write(key,new LongWritable(counter));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //创建Configuration
        Configuration configuration = new Configuration();

        //创建Job
        Job job = Job.getInstance(configuration);

        //设置job的处理类
        job.setJarByClass(InverseIndexStepOne.class);

        //设置map相关参数
        job.setMapperClass(StepOneMapper.class);
        //设置reduce相关参数
        job.setReducerClass(StepOneReducer.class);

        //指定mapper数据的输出kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //指定reducer数据的输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //设置作业处理的输入路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //设置作业处理的输出路径
        Path output=new Path(args[1]);
        FileSystem fs =FileSystem.get(configuration);
        //输出路径存在就删除
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job, output);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
